1   package org.apache.lucene.index;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.Map;
25  
26  import org.apache.lucene.analysis.Analyzer; // javadocs
27  import org.apache.lucene.index.LeafReader;
28  import org.apache.lucene.index.IndexWriter;
29  import org.apache.lucene.index.MergePolicy;
30  import org.apache.lucene.index.MergeState;
31  import org.apache.lucene.index.MergeTrigger;
32  import org.apache.lucene.index.MultiReader;
33  import org.apache.lucene.index.SegmentCommitInfo;
34  import org.apache.lucene.index.SegmentInfo;
35  import org.apache.lucene.index.SegmentInfos;
36  import org.apache.lucene.index.SegmentReader;
37  import org.apache.lucene.index.SlowCompositeReaderWrapper;
38  import org.apache.lucene.search.Sort;
39  import org.apache.lucene.store.Directory;
40  import org.apache.lucene.util.Bits;
41  import org.apache.lucene.util.InfoStream;
42  import org.apache.lucene.util.packed.PackedInts;
43  import org.apache.lucene.util.packed.PackedLongValues;
44  
45  /** A {@link MergePolicy} that reorders documents according to a {@link Sort}
46   *  before merging them. As a consequence, all segments resulting from a merge
47   *  will be sorted while segments resulting from a flush will be in the order
48   *  in which documents have been added.
49   *  <p><b>NOTE</b>: Never use this policy if you rely on
50   *  {@link IndexWriter#addDocuments(Iterable) IndexWriter.addDocuments}
51   *  to have sequentially-assigned doc IDs, this policy will scatter doc IDs.
52   *  <p><b>NOTE</b>: This policy should only be used with idempotent {@code Sort}s 
53   *  so that the order of segments is predictable. For example, using 
54   *  {@link Sort#INDEXORDER} in reverse (which is not idempotent) will make 
55   *  the order of documents in a segment depend on the number of times the segment 
56   *  has been merged.
57   *  @lucene.experimental */
58  public final class SortingMergePolicy extends MergePolicy {
59  
60    /**
61     * Put in the {@link SegmentInfo#getDiagnostics() diagnostics} to denote that
62     * this segment is sorted.
63     */
64    public static final String SORTER_ID_PROP = "sorter";
65    
66    class SortingOneMerge extends OneMerge {
67  
68      List<CodecReader> unsortedReaders;
69      Sorter.DocMap docMap;
70      LeafReader sortedView;
71      final InfoStream infoStream;
72  
73      SortingOneMerge(List<SegmentCommitInfo> segments, InfoStream infoStream) {
74        super(segments);
75        this.infoStream = infoStream;
76      }
77  
78      @Override
79      public List<CodecReader> getMergeReaders() throws IOException {
80        if (unsortedReaders == null) {
81          unsortedReaders = super.getMergeReaders();
82          if (infoStream.isEnabled("SMP")) {
83            infoStream.message("SMP", "sorting " + unsortedReaders);
84            for (LeafReader leaf : unsortedReaders) {
85              String sortDescription = getSortDescription(leaf);
86              if (sortDescription == null) {
87                sortDescription = "not sorted";
88              }
89              infoStream.message("SMP", "seg=" + leaf + " " + sortDescription);
90            }
91          }
92          // wrap readers, to be optimal for merge;
93          List<LeafReader> wrapped = new ArrayList<>(unsortedReaders.size());
94          for (LeafReader leaf : unsortedReaders) {
95            if (leaf instanceof SegmentReader) {
96              leaf = new MergeReaderWrapper((SegmentReader)leaf);
97            }
98            wrapped.add(leaf);
99          }
100         final LeafReader atomicView;
101         if (wrapped.size() == 1) {
102           atomicView = wrapped.get(0);
103         } else {
104           final CompositeReader multiReader = new MultiReader(wrapped.toArray(new LeafReader[wrapped.size()]));
105           atomicView = new SlowCompositeReaderWrapper(multiReader, true);
106         }
107         docMap = sorter.sort(atomicView);
108         sortedView = SortingLeafReader.wrap(atomicView, docMap);
109       }
110       // a null doc map means that the readers are already sorted
111       if (docMap == null) {
112         if (infoStream.isEnabled("SMP")) {
113           infoStream.message("SMP", "readers already sorted, omitting sort");
114         }
115         return unsortedReaders;
116       } else {
117         if (infoStream.isEnabled("SMP")) {
118           infoStream.message("SMP", "sorting readers by " + sort);
119         }
120         return Collections.singletonList(SlowCodecReaderWrapper.wrap(sortedView));
121       }
122     }
123     
124     @Override
125     public void setMergeInfo(SegmentCommitInfo info) {
126       Map<String,String> diagnostics = info.info.getDiagnostics();
127       diagnostics.put(SORTER_ID_PROP, sorter.getID());
128       super.setMergeInfo(info);
129     }
130 
131     private PackedLongValues getDeletes(List<CodecReader> readers) {
132       PackedLongValues.Builder deletes = PackedLongValues.monotonicBuilder(PackedInts.COMPACT);
133       int deleteCount = 0;
134       for (LeafReader reader : readers) {
135         final int maxDoc = reader.maxDoc();
136         final Bits liveDocs = reader.getLiveDocs();
137         for (int i = 0; i < maxDoc; ++i) {
138           if (liveDocs != null && !liveDocs.get(i)) {
139             ++deleteCount;
140           } else {
141             deletes.add(deleteCount);
142           }
143         }
144       }
145       return deletes.build();
146     }
147 
148     @Override
149     public MergePolicy.DocMap getDocMap(final MergeState mergeState) {
150       if (unsortedReaders == null) {
151         throw new IllegalStateException();
152       }
153       if (docMap == null) {
154         return super.getDocMap(mergeState);
155       }
156       assert mergeState.docMaps.length == 1; // we returned a singleton reader
157       final PackedLongValues deletes = getDeletes(unsortedReaders);
158       return new MergePolicy.DocMap() {
159         @Override
160         public int map(int old) {
161           final int oldWithDeletes = old + (int) deletes.get(old);
162           final int newWithDeletes = docMap.oldToNew(oldWithDeletes);
163           return mergeState.docMaps[0].get(newWithDeletes);
164         }
165       };
166     }
167 
168   }
169 
170   class SortingMergeSpecification extends MergeSpecification {
171     final InfoStream infoStream;
172     
173     SortingMergeSpecification(InfoStream infoStream) {
174       this.infoStream = infoStream;
175     }
176 
177     @Override
178     public void add(OneMerge merge) {
179       super.add(new SortingOneMerge(merge.segments, infoStream));
180     }
181 
182     @Override
183     public String segString(Directory dir) {
184       return "SortingMergeSpec(" + super.segString(dir) + ", sorter=" + sorter + ")";
185     }
186 
187   }
188 
189   /** Returns {@code true} if the given {@code reader} is sorted by the
190    *  {@code sort} given. Typically the given {@code sort} would be the
191    *  {@link SortingMergePolicy#getSort()} order of a {@link SortingMergePolicy}. */
192   public static boolean isSorted(LeafReader reader, Sort sort) {
193     String description = getSortDescription(reader);
194     if (description != null && description.equals(sort.toString())) {
195       return true;
196     }
197     return false;
198   }
199   
200   private static String getSortDescription(LeafReader reader)  {
201     if (reader instanceof SegmentReader) {
202       final SegmentReader segReader = (SegmentReader) reader;
203       final Map<String, String> diagnostics = segReader.getSegmentInfo().info.getDiagnostics();
204       if (diagnostics != null) {
205         return diagnostics.get(SORTER_ID_PROP);
206       }
207     } else if (reader instanceof FilterLeafReader) {
208       return getSortDescription(FilterLeafReader.unwrap(reader));
209     }
210     return null;
211   }
212 
213   private MergeSpecification sortedMergeSpecification(MergeSpecification specification, InfoStream infoStream) {
214     if (specification == null) {
215       return null;
216     }
217     MergeSpecification sortingSpec = new SortingMergeSpecification(infoStream);
218     for (OneMerge merge : specification.merges) {
219       sortingSpec.add(merge);
220     }
221     return sortingSpec;
222   }
223 
224   final MergePolicy in;
225   final Sorter sorter;
226   final Sort sort;
227 
228   /** Create a new {@code MergePolicy} that sorts documents with the given {@code sort}. */
229   public SortingMergePolicy(MergePolicy in, Sort sort) {
230     this.in = in;
231     this.sorter = new Sorter(sort);
232     this.sort = sort;
233   }
234 
235   /** Return the {@link Sort} order that is used to sort segments when merging. */
236   public Sort getSort() {
237     return sort;
238   }
239 
240   @Override
241   public MergeSpecification findMerges(MergeTrigger mergeTrigger,
242       SegmentInfos segmentInfos, IndexWriter writer) throws IOException {
243     return sortedMergeSpecification(in.findMerges(mergeTrigger, segmentInfos, writer), writer.infoStream);
244   }
245 
246   @Override
247   public MergeSpecification findForcedMerges(SegmentInfos segmentInfos,
248       int maxSegmentCount, Map<SegmentCommitInfo,Boolean> segmentsToMerge, IndexWriter writer)
249       throws IOException {
250     return sortedMergeSpecification(in.findForcedMerges(segmentInfos, maxSegmentCount, segmentsToMerge, writer), writer.infoStream);
251   }
252 
253   @Override
254   public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, IndexWriter writer)
255       throws IOException {
256     return sortedMergeSpecification(in.findForcedDeletesMerges(segmentInfos, writer), writer.infoStream);
257   }
258 
259   @Override
260   public boolean useCompoundFile(SegmentInfos segments,
261       SegmentCommitInfo newSegment, IndexWriter writer) throws IOException {
262     return in.useCompoundFile(segments, newSegment, writer);
263   }
264 
265   @Override
266   protected long size(SegmentCommitInfo info, IndexWriter writer) throws IOException {
267     return in.size(info, writer);
268   }
269 
270   @Override
271   public String toString() {
272     return "SortingMergePolicy(" + in + ", sorter=" + sorter + ")";
273   }
274 }